package com.example.demo.common;

import com.example.demo.entity.Column;
import com.example.demo.entity.JobTable;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

/**
 * @author guiguketang
 * @date 2020/11/13 15:27
 * 单表同步
 * TODO 1.自定义数据同步，如果表有外键约束则不能成功同步需要做处理；
 * TODO 否则数据同步成功后无法恢复外键约束设置；
 * TODO 2.正在执行同步任务的表加锁，否则会发生一表任务多发的问题；
 * TODO 3.同步任务表数量过大（500万）需要优化处理，暂不支持。
 * TODO 4.同步任务分散，每次只取3条待同步的任务表进行同步
 * TODO 5.智能学习，大表、异常记忆
 * TODO 6.修改日志记录方式
 */
@Slf4j
public class SynDataSingle {
    public int syncDataByJobTable(JobTable jt) {
        Connection conn = null;
        Statement stmt = null;
        Connection connDes = null;
        Statement stmtDes = null;
        String start_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

        try {
            //the information of source database;
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
            String url = "jdbc:sqlserver://" + jt.getDbip_read() + ":1433; DatabaseName=" + jt.getDbname_read() + "_bigdata_snap";
            String user = "erp_dp";
            String password = "3DD609B7-FED0-49FA-9F21-5E08A4F7609E";
            conn = DriverManager.getConnection(url, user, password);
            //conn.setAutoCommit(false);
            stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);

            //the information of destination database;
            String urlDes = "jdbc:sqlserver://" + jt.getDbip_write() + ":1433; DatabaseName=" + jt.getDbname_write();
            String userDes = "erp_dp";
            String passwordDes = "3DD609B7-FED0-49FA-9F21-5E08A4F7609E";
            connDes = DriverManager.getConnection(urlDes, userDes, passwordDes);
            connDes.setAutoCommit(false);
            stmtDes = connDes.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
            String des_sql = "insert into " + jt.getJob_table() + "(" + jt.getColumns() + ") values(";
            if (jt.getIs_identity() == 1) {
                System.out.println(" setting identity");
                stmtDes.addBatch("set IDENTITY_INSERT " + jt.getJob_table() + "  on");
            }
//            System.out.println("==The presql is:===" + jt.getPresql());
            String strPreSql[] = jt.getPresql().split(",");
            for (String sql : strPreSql) {
//                System.out.println("the preSql is>>>>"+sql);
                if (sql.length() != 0) {
                    stmtDes.addBatch(sql.replaceAll("'", ""));
                }
            }

            //获取同步表列数据类型；
            String columnSql = "SELECT a.colorder, a.name,b.name type " +
                    "FROM syscolumns a " +
                    "left join systypes b on a.xusertype = b.xusertype " +
                    "inner join sysobjects d on a.id = d.id  and d.xtype = 'U' and d.name <> 'dtproperties' " +
                    "left join syscomments e on a.cdefault = e.id " +
                    "left join sys.extended_properties g on a.id = G.major_id and a.colid = g.minor_id " +
                    "left join sys.extended_properties f on d.id = f.major_id and f.minor_id = 0 " +
                    "where d.name = '" + jt.getJob_table() + "' " +
                    "order by a.id,a.colorder";

            //取表第一列用于分页
            List<Column> columnList = new ArrayList<Column>();
            ResultSet rsColumn = stmtDes.executeQuery(columnSql);
            String row_id = "id";
            while (rsColumn.next()) {
                if (rsColumn.isFirst()) {
                    row_id = rsColumn.getString("name");
                }
                Column columnObj = new Column();
                columnObj.setOrder(rsColumn.getInt("colorder"));
                columnObj.setName(rsColumn.getString("name"));
                columnObj.setType(rsColumn.getString("type"));
                columnList.add(columnObj);
            }

            String base_sql = "select * from " + jt.getJob_table();
            String count_sql = "select count(*) from " + jt.getJob_table();
            String sqlTmp = "";
            String where_sql="";
            if (jt.getJob_table_condition() != null && jt.getJob_table_condition().length() > 0) {
                base_sql = jt.getJob_table_condition();
                sqlTmp = jt.getJob_table_condition().replace(";", "");
                count_sql = "select count(*) from (" + sqlTmp + ") t";
            }
            log.info("before batch,the select sql is==="+base_sql);
            if(base_sql.indexOf("where")>0) {
                where_sql = base_sql.substring(base_sql.indexOf("where"), base_sql.length());
            }
            log.info("the where sql is==="+where_sql);
            //the total number of synchronizing table.
            int count = 59;
            System.out.println("the count sql is ===");
            System.out.println(count_sql);
            ResultSet rsc = stmt.executeQuery(count_sql);
            while (rsc.next()) {
                count = rsc.getInt(1);
            }
            if (count > 5000000) {
                System.out.println(jt.getJob_table() + "同步任务暂停，不支持五百万级以上数据自定义同步！");
                return 0;
            }


            //如果待同步表数据超过10万，则分批同步数据，batch start;
            int batch = 100000;
            for (int i = 0; i < count / batch + 1; i++) {
                base_sql = "select top "+batch+" * from " +
                        " (select row_number() over(order by "+row_id+") as rownumber,* from "+jt.getJob_table()+where_sql+") temp_row"
                        +" where rownumber>"+batch*i;
                log.info("in batch the select sql is===="+base_sql);
                ResultSet rs = stmt.executeQuery(base_sql);
                //入库结果集处理;
                while (rs.next()) {
                    String insert_sql = "";
                    //拼接insert，如果是最后一列不加“,”，否则需要加逗号。
                    for (Column colObj : columnList) {
                        switch (colObj.getType()) {
                            case "ID":
                            case "bit":
                            case "int":
                            case "STATUS":
                            case "bigint":
                            case "smallint":
                            case "SHORTINT":
                            case "SMALL_INT":
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? rs.getInt(colObj.getName()) : rs.getInt(colObj.getName()) + ",";
                                break;
                            case "numeric":
                            case "decimal":
                            case "DECIMAL1":
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? rs.getBigDecimal(colObj.getName()) : rs.getBigDecimal(colObj.getName()) + ",";
                                break;
                            case "DATE":
                            case "datetime":
                            case "DATE_TIME":
                            case "DATETIME1":
                                String data_value = "";
                                if (rs.getTimestamp(colObj.getName()) == null) {
                                    //do nothing!
                                } else {
                                    data_value = rs.getTimestamp(colObj.getName()).toString();
                                }
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? "'" + data_value + "'" : "'" + data_value + "',";
                                break;
                            default:
                                String str_value = "";
                                if (rs.getString(colObj.getName()) == null) {
                                    //do nothing!
                                } else {
                                    str_value = rs.getString(colObj.getName()).toString().replace("'", "''");
                                }
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? "'" + str_value + "'" : "'" + str_value + "',";
                        }
                    }
                    insert_sql = des_sql + insert_sql + ")";
                    stmtDes.addBatch(insert_sql);
                }
                //每20万条记录提交一次，防止GC overhead limit exceeded
                if (i > 0 && (i + 1) % 2 == 0) {
                    System.out.println("执行commit,the i is>>>>>" + i);
                    stmtDes.executeBatch();
                    connDes.commit();
                    stmtDes.clearBatch();
                    log.info("executing batch commit! 第" + (i + 1) * 10 + "万条记录");
                }
            }
            //batch end;
            //关闭标识列
            if (jt.getIs_identity() == 1) {
                System.out.println(" resetting identity");
                stmtDes.addBatch("set IDENTITY_INSERT " + jt.getJob_table() + " off");
            }
            if (jt.getPostsql() != null && jt.getPostsql().length() > 0) {
                String strPostSql[] = jt.getPostsql().split(",");
                for (String sql : strPostSql) {
                    if (sql.trim().length() != 0) {
                        stmtDes.addBatch(sql.replaceAll("'", ""));
                    }
                }
            }
            String filter_columns[] = {};
            if (jt.getColumn_name() != null && jt.getColumn_name().length() > 0)
                filter_columns = jt.getColumn_name().split(",");
            String symbol = "";
            if (filter_columns.length > 0) {
                String filter_sql = "update " + jt.getJob_table() + " set ";
                for (String data : filter_columns) {
                    filter_sql += symbol + data + "='***'";
                    symbol = ",";
                }
                System.out.println("filter_sql is====" + filter_sql);
                stmtDes.addBatch(filter_sql);
            }
            stmtDes.executeBatch();
            connDes.commit();
            System.out.println("结束时间：" + System.nanoTime());
            stmtDes.close();
            connDes.close();
            stmt.close();
            conn.close();
            return 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                return 0;
            }
            System.out.println("start_time is====>" + start_time);
            System.out.println("end_time is===>" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        }
    }
}